Skip to content

fix: handle DrainIngress in fake_data_generator to unblock graceful shutdown#2515

Merged
jmacd merged 14 commits intoopen-telemetry:mainfrom
sjmsft:bug_2511
Apr 9, 2026
Merged

fix: handle DrainIngress in fake_data_generator to unblock graceful shutdown#2515
jmacd merged 14 commits intoopen-telemetry:mainfrom
sjmsft:bug_2511

Conversation

@sjmsft
Copy link
Copy Markdown
Contributor

@sjmsft sjmsft commented Apr 2, 2026

Change Summary

The "Ack nack redesign" PR (3dca283) introduced a two-phase DrainIngress/ReceiverDrained shutdown protocol but missed updating the fake_data_generator receiver. Without the DrainIngress handler, the message falls into the _ => {} catch-all, notify_receiver_drained() is never called, the pipeline controller never removes the receiver from its pending set, and after the deadline expires it emits DrainDeadlineReached. This was causing pipeline-perf-test-basic to fail consistently.

What issue does this PR close?

pipeline-perf-test-basic unit test is failing.

How are these changes tested?

fake_data_generator and runtime_control_metrics tests were executed.

Are there any user-facing changes?

No, fake_data_generator is an internal test/load-generation receiver, not a user-facing component.

@sjmsft sjmsft requested a review from a team as a code owner April 2, 2026 16:46
@github-actions github-actions bot added the rust Pull requests that update Rust code label Apr 2, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 2, 2026

Codecov Report

❌ Patch coverage is 99.10714% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 88.39%. Comparing base (e1742e0) to head (108bc13).
⚠️ Report is 11 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2515      +/-   ##
==========================================
+ Coverage   88.37%   88.39%   +0.02%     
==========================================
  Files         620      622       +2     
  Lines      228395   230170    +1775     
==========================================
+ Hits       201836   203457    +1621     
- Misses      26035    26189     +154     
  Partials      524      524              
Components Coverage Δ
otap-dataflow 90.22% <99.10%> (+0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.74% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 52.45% <ø> (ø)
quiver 92.27% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 2, 2026

The sequencing here looks off. In graceful shutdown the runtime does:

DrainIngress -> ReceiverDrained -> downstream Shutdown.

This change makes fake_data_generator do

DrainIngress -> ReceiverDrained -> wait for Shutdown,

but that Shutdown is not part of the normal post-drain receiver path. For this receiver, once ingress is stopped there is no receiver-local work left to preserve, so it should exit directly on DrainIngress rather than report drained and then block waiting for another shutdown message.

Copy link
Copy Markdown
Member

@lalitb lalitb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please go through the comment here.

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 2, 2026

The correct fix should be:

DrainIngress -> notify_receiver_drained() -> return TerminalState immediately

Something like (not tested):

  Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
      otel_info!("fake_data_generator.drain_ingress");                                                                                                                                                                               
      effect_handler.notify_receiver_drained().await?;
      return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));                                                                                                                                                            
  }    

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 2, 2026

The fix now looks correct. However from CI failures, there looks like a shutdown race in fake_data_generator that is easy to hit on slower runners The test config uses signals_per_second = 1, so the receiver can sleep for close to 1 second between sends, while the shutdown deadline in test_telemetry_registries_cleanup is only 200ms. That means DrainIngress can arrive while the receiver is asleep, the runtime can move into forced shutdown before the receiver handles it, and then notify_receiver_drained().await? can fail with Channel is closed.

One option could be to address this in two places:

  • make the rate-limit sleep interruptible, since that looks like the root cause here.
if signals_per_second.is_some() {
    let remaining_time = wait_till - Instant::now();
    if remaining_time.as_secs_f64() > 0.0 {
        tokio::select! {
            biased;

            ctrl_msg = ctrl_msg_recv.recv() => {
                // handle DrainIngress / Shutdown during the rate-limit wait
                // using the same control-message handling as the main loop
            }

            _ = sleep(remaining_time) => {}
        }
    }
}
  • make notify_receiver_drained() best-effort on the terminal DrainIngress path, so a late control-plane teardown does not turn shutdown into an error.
Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
    otel_info!("fake_data_generator.drain_ingress");
    let _ = effect_handler.notify_receiver_drained().await;
    return Ok(TerminalState::new(deadline, [self.metrics.snapshot()]));
}

@sjmsft sjmsft requested a review from lalitb April 2, 2026 23:39
@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Apr 4, 2026

@sjmsft

[in addition to the sequence described by @lalitb ]

The exception is deadline-forced shutdown. If the drain deadline expires before the receiver reports drained, the runtime sends NodeControlMsg::Shutdown { deadline, reason } to any still-pending receivers.

jmacd and others added 3 commits April 8, 2026 12:41
main.rs but the Dockerfile was not updated to copy the file from the
otel-arrow build context, breaking the Docker build.

Co-authored-by: Copilot <[email protected]>
Verifies the receiver handles DrainIngress promptly even while
sleeping in a rate-limit interval. Without the DrainIngress handler
the receiver would stall until the drain deadline expired, causing
DrainDeadlineReached.

Co-authored-by: Copilot <[email protected]>
lalitb

This comment was marked as outdated.

lalitb

This comment was marked as outdated.

}
Ok(NodeControlMsg::DrainIngress { deadline, .. }) => {
otel_info!("fake_data_generator.drain_ingress");
let _ = effect_handler.notify_receiver_drained().await;
Copy link
Copy Markdown
Member

@lalitb lalitb Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let _ = effect_handler.notify_receiver_drained().await;
effect_handler.notify_receiver_drained().await?;

}
}
_ = sleep(remaining_time) => {}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current sleep is making the DrainIngress/Shutdown responsible , but it is also changing the rate-limiting behavior - so any non-terminal control message handled as Ok(None) exist the sleep immediately and next batch can be sent before the original wait_till. We should replace the line 445-456 above with:

// Keep the original sleep deadline even if non-terminal control
// messages arrive. Only DrainIngress/Shutdown should interrupt
// the rate-limit wait early.
let sleep_until = sleep(remaining_time);
tokio::pin!(sleep_until);

loop {
    tokio::select! {
        biased;
        ctrl_msg = ctrl_msg_recv.recv() => {
            if let Some(terminal) =
                handle_control_msg(ctrl_msg, &effect_handler, &mut self.metrics).await?
            {
                return Ok(terminal);
            }
        }
        _ = &mut sleep_until => break,
    }
}

Copy link
Copy Markdown
Contributor Author

@sjmsft sjmsft Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the new changes + new test.

@lalitb lalitb self-requested a review April 8, 2026 22:00
github-merge-queue bot pushed a commit that referenced this pull request Apr 8, 2026
# Change Summary
Adds a necessary Dockerfile line to fix the build.
Adds a test to our CI/CD workflow, which would have caught this in
#2597.
This was going to block #2515

---------

Co-authored-by: Copilot <[email protected]>
Copy link
Copy Markdown
Member

@lalitb lalitb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@jmacd jmacd enabled auto-merge April 8, 2026 23:49
@jmacd jmacd added this pull request to the merge queue Apr 9, 2026
Merged via the queue into open-telemetry:main with commit 9b4b8dc Apr 9, 2026
69 of 70 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

pipeline-perf-test-basic unit test is failing

4 participants